Java 并发(三) - 活跃性和性能,死锁和显示锁

Posted by LinYaoTian on 2018-09-15

避免活跃性危险

死锁

定义:当一个线程永远占有一个锁,而其他线程尝试去获得这个锁,那么它们将永远被阻塞。

例如:当线程占有锁A时, 想要获得锁B, 但是同时, 另一个线程持有B, 并尝试获得A, 两个线程将永远等待下去, 这种情况是死锁的最简单的形式 。

一般来说,当死锁出现时,往往是在最糟糕的时候——高负载的情况下

锁顺序死锁

锁顺序死锁的原因是:两个线程以不同的顺序来获得相同的锁。

如果所有线程都以固定的顺序来获得锁,那么就不会出现锁顺序死锁的问题。

简单的锁顺序死锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Test {
private final Object left = new Object();
private final Object right = new Object();

public void leftRight(){
synchronized (left) {
synchronized (right) {
//do
}
}
}

public void rightLeft(){
synchronized (right) {
synchronized (left) {
//do
}
}
}
}

动态锁顺序死锁

动态的锁顺序死锁常发生在锁的参数传递上

例如一个转账操作, 将账号A的钱转到账号B中, 然后在转账过程中顺序对账号A和账号B使用同步锁, 同时又出现账号B向账号A转账, 这时应为参数顺序导致死锁:

1
2
3
A:transferMoney(myAccount, YourAccount, 10); 

B:transferMoney(YourAccount, myAccount, 10);

如果执行的时机不对,可能发生A持有myAccount,等待YourAccountB持有YourAccount,等待myAccount,这样导致死锁。

1
2
3
4
5
6
7
8
9
10
11
12
public void transferMoney(Account fromAccount, Account toAccount, DollarAmount amount) {  
synchronized (fromAccount) {
synchronized (toAccount) {
if (fromAccount.getBalance().compareTo(amount)<0) {
throw new RuntimeException();
} else {
fromAccount.debit(amount);
toAccount.credit(amount);
}
}
}
}

动态的锁顺序死锁还是由于获取锁的顺序不同导致的,为了解决这个问题,必须定义锁的顺序,并且整个程序都必须按照这个顺序来获取锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void transferMoney(final Account fromAccount, final Account toAccount, final DollarAmount amount) {  
class Helper {
public void transfer() {
if (fromAccount.getBalance().compareTo(amount) < 0) {
throw new RuntimeException();
} else {
fromAccount.debit(amount);
toAccount.credit(amount);
}
}
}
// 通过唯一hashcode来统一锁的顺序, 如果account具有唯一键, 可以采用该键来作为顺序.
int fromHash = System.identityHashCode(fromAccount);
int toHash = System.identityHashCode(toAccount);
if (fromHash < toHash) {
synchronized (fromAccount) {
synchronized (toAccount) {
new Helper().transfer();
}
}
} else if (fromHash > toHash) {
synchronized (toAccount) {
synchronized (fromAccount) {
new Helper().transfer();
}
}
} else {
synchronized (tieLock) { // 针对fromAccount和toAccount具有相同的hashcode
synchronized (fromAccount) {
synchronized (toAccount) {
new Helper().transfer();
}
}
}
}
}

协作对象之间的死锁

某些获得多个锁的操作并不明显,很难察觉

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Taxi{
private final Dispather dispather;

public Taxi(Dispather dispather){
this.dispather = dispather;
}

public synchronized String getLocation(){
return null;
}

public synchronized void setLocation(){
dispather.notifyTaxi();
}
}

class Dispather{

private final Taxi taxi;

public Dispather(Taxi taxi){
this.taxi = taxi;
}

public synchronized void notifyTaxi(){

}

public synchronized void getInfo(){
taxi.getLocation();
}
}

代码看起来没有明显地锁顺序问题,但TaxisetLocation()本身已经加了内置锁,然后调用dispather.notifyTaxi()notifyTaxi()也加了锁,所以这个方法实际获取了两个锁,同理Dispather。因此也存在获取锁的顺序导致死锁的问题。

如果在持有锁时调用某个外部方法,那么可能会导致死锁

死锁的避免与分析

开放调用:如果在调用某个方法时不需要持有锁。(同步代码块在方法内部仅用于保护那些涉及共享状态的操作)

使用开放调用可以将加锁逻辑进行封装,减少死锁的发生。

  1. 有界线程池/资源池与相互依赖的任务不能一起使用
  2. 如果要获取多个锁,必须考虑锁的顺序,尽可能使用开放调用。
  3. 使用显示锁检测死锁,并且可以从死锁中恢复过来
  4. 通过线程转储信息来分析死锁

其它活跃性危险

  1. 饥饿:由于线程无法访问它所需要的资源而不能继续执行时,就发生了饥饿。
  2. 糟糕的响应性也很影响活跃性。
  3. 活锁:尽管没有被阻塞, 线程却仍然不能继续, 因为它不断重试相同的操作, 却总是失败。活锁通常发生在消息处理应用程序中。

性能和可伸缩性

应用程序的性能可以用多个指标来衡量:服务时间,延迟时间,吞吐率,效率,可伸缩性和容量。
可伸缩性:当增加计算机资源时(内存,CPU,存储容量或IO带宽),程序的吞吐量或者处理能力要相应得提高(因此串行的可伸缩性是很差的)。

使用线程的性能开销

这些开销包括:线程之间的协调(加锁,内存同步),上下文切换,线程创建和销毁和线程的调度等。如果过度使用线程,这些开销甚至大于由于提高吞吐量、响应性或者计算能力所带来的性能提升。

上下文切换

如果可运行的线程数大于 CPU 的数量,那么操作系统会将某个正在运行的线程调度出来,保存当前线程执行的上下文(以便下次再次调度能恢复进度),调度其他线程使用 CPU,并将新调度的上下文设置为当前的上下文。

内存同步

synchronizedvolatile 提供的可见性保证中,可能会使用一些特殊的指令——内存栅栏(Barrier)。内存栅栏可以刷新缓存,是缓存无效,这可能会对性能带来影响,因为它将抑制一些编译器优化。在内存栅栏中,大多数操作时不能重排序的。

现在的 JVM 能通过优化去掉一下不会发生竞争的锁,以减少不必要的性能开销。如果对一个锁对象只能由当前线程访问(通常是局部变量),那么 JVM 会去掉这个锁的获取。

例如 JVM 会去掉下面这个锁:

1
2
3
synchronized(new Object()){
//
}

一些更完备的 JVM 会通过逸出分析,找出不会发布到堆的本地对象引用(即这个引用是局部变量)。例如:

1
2
3
4
5
6
7
public String getNames(){
List<String> names = new Vector<String>();
names.add("Jack");
names.add("Tom");
names.add("Jerry");
return names.toString();
}

getNames() 中,对Vector的唯一引用就是names,它是一个线程局部变量。按道理来说,这个方法在执行过程中,会将Vector的锁获取并释放 4 次(每次addtoString)。但是,编译器会分析这些调用都是本地引用,因此可以去掉这 4 次对锁的获取释放。

如果当前使用的JVM不支持逸出分析,编译器也可以执行锁粒度粗化,即将相邻的同步代码块用同一个锁合并起来。在上面的代码中,编译器会把3个add操作和1个toString操作合并为1个获取释放锁操作。

阻塞

当在锁发生竞争时,失败的线程肯定会被阻塞。当线程无法获取某个锁或在某个条件等待或在IO操作阻塞时,需要被挂起,这个过程将包括两次额外的上下文切换,以及必要的操作系统和缓存操作。

减少锁竞争提升性能

有两个因素将影响在锁上发生竞争的可能性:锁的请求频率,以及每次持有锁的时间。
三种方式降低锁的竞争程度:

  • 减少锁的持有时间
  • 降低锁的请求频率
  • 使用其它带有协调机制的锁

缩小锁的范围以减少锁的持有时间

原始代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class AttributeStore{
private final Map<String,String> attributes = new HashMap<String,String>();

public synchronized boolean userLocationMatches(String name,String regexp){
String key = "user." + name + ".location";
String location = arritubes.get(key);
if(location == null){
return false;
}else{
return Pattern.matches(regexp,location);
}
}
}

改进:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class AttributeStore{
private final Map<String,String> attributes = new HashMap<String,String>();

public boolean userLocationMatches(String name,String regexp){
String key = "user." + name + ".location";
synchronized(this){
String location = arritubes.get(key);
}
if(location == null){
return false;
}else{
return Pattern.matches(regexp,location);
}
}
}

此外还可以通过将线程安全性委托给其他的类来进一步提升它的性能,即使用线程安全的HashTablesynchroniedMapConcurrentHashMap 代替 HashMap

减锁分解低请求锁的频率

1
2
3
4
5
6
7
8
9
10
11
12
public class Server{
private final Set<String> users = new HashSet<String>();
private final Set<String> querys = new HashSet<String>();

public synchronized void addUser(String user){
users.add(user);
}

public synchronized void addQuery(String query){
querys.add(query);
}
}

对于上面这个代码,addUseraddQuery是对两个不同对象的操作,如果为两个操作都加上同一个锁,addUseraddQuery不能同时并发,但更好的方式是应该让addUseraddQuery可以同时执行,因为它们不影响彼此。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Server{
private final Set<String> users = new HashSet<String>();
private final Set<String> querys = new HashSet<String>();

public void addUser(String user){
synchronized(users){
users.add(user);
}
}

public void addQuery(String query){
synchronized(querys){
querys.add(query);
}
}
}

代码中不使用 Server 锁来 保护用户的状态和查询状态,而每个状态都通过一个锁来保护。即将一个锁分解为两个锁。

锁分段降低请求锁的频率

锁分段:将锁分解技术进一步拓展为对一组独立对象的锁进行分解。

ConcurrentHashMap 的实现中使用了一个包含16个锁的数组,每个锁保护所有散列桶的1/16,其中第N个散列桶由第(N mod 16)个锁来保护。

缺点:要获取多个锁来实现独占访问将更加困难并且开销更高。(例如在ConcurrentHashMap )需要拓展映射范围,以及重新计算键值的哈希散列值要分布到更大的桶集合中时,就需要获取分段所在集合中所有的锁)

使用其它带有协调机制的锁替代独占锁

例如并发容器,读-写锁,不可变对象和原子变量。

显式锁(Lock)

与内置锁不同,Lock 提供了一种无条件的,可轮询的,定时的以及可中断的锁获取操作,所有加锁和解锁都是显式的。

1
2
3
4
5
6
7
8
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}

ReentrantLock 和 synchronized 的比较

  1. ReentrantLock 实现了 Lock 接口,提供了与synchronized同样的互斥性和可见性,也同样提供了可重入性。
  2. 内置锁存在一些功能限制:无法中断一个正在等待获取锁的线程,无法获取一个锁时无限得等待下去。ReentrantLock更加灵活,能提供更好的活跃性和性能
  3. 内置锁的释放时自动的,而ReentrantLock的释放必须在finally手动释放
1
2
3
4
5
6
7
8
Lock lock = new ReentrantLock();
lock.lock();
///
try{

} finally{
lock.unlock();
}

总结:

ReentrantLock在性能上优于内置锁,但是内置所为更多工作人员熟悉,并且代码简洁,而ReentrantLock的危险性比synchronized更高(通常是忘记调用unlock),因此仅当需要一些高级功能(可定时,可轮询与可中断的锁获取操作,公平队列,以及非块结构的锁),synchronized不能满足需求时,使用ReentrantLock,否则还是应该使用synchronized

ReentrantLock 的高级功能

可中断的锁获取操作

1
void lockInterruptibly() throws InterruptedException;

lockInterruptibly()可抛出一个InterruptedException,可在捕获异常之后进行操作。

带有时间限制的加锁

1
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

同样的,时间锁也可以响应中断

非块结构的加锁

公平锁和非公平锁

ReentrantLock的构造函数中提供了两种公平性选择:创建一个非公平的锁(默认),或者创建一个公平锁。

1
2
3
4
5
6
7
8
//默认不公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
//选择公平或不公平
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
  • 公平锁:线程按照它们发出请求的顺序来获得锁。
  • 非公平锁:允许插队,当一个线程请求非公平锁时,如果在发出请求的同时该锁的状态变为可用,那么这个线程将跳过队列中所有等待线程并获得该锁。

非公平锁的性能一般大于高于公平锁,因为恢复一个被挂起的线程与该线程真正开始运行之间存在着严重的延迟。
假设线程A持有一个锁,并且线程B请求这个锁,由于A已经持有这个锁了,所以B会被挂起。当A释放锁时,B开始被唤醒。如果B唤醒的时间比较长,与此同时,线程C也请求这个锁,并且在B完全唤醒之前,C获得使用并且释放了这个锁,这就达到了双赢的局面。

读 - 写锁

ReentrantLock是一种互斥的操作,虽然能避免“写—写”冲突,也避免了“写—读”冲突,但也避免了“读—读”冲突。在多数情况下,操作都是读操作,那么如果允许“读—读”操作,将会大幅度得提升性能。
这时,可以使用读—写锁:一个资源可以被多个读操作访问,或者被一个写操作访问,但两者不能同时访问。

1
2
3
4
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}

读—写锁有一下需要考虑的点:

  1. 当一个写线程释放锁时,队列中同时存在一个写线程和读线程,哪个应该先被调度?
    在公平锁中,等待时间最长的线程优先获得锁;在非公平锁中,线程调度的顺序是不确定的。
  2. 如果锁由读线程持有,当前写线程正在等待,如果新到达的读线程能够插队立即获得锁的访问权?还是应该在写线程后面等待?
    在公平锁中,如果持有锁的是读线程,而写线程正在请求写入锁,那么其他读线程都不能获取读取锁,直到写线程使用完并释放了写入锁。
  3. 读取锁和写入锁是否是可重入的?
    读取所和写入锁都是可重入的。
  4. 如果一个线程持有写入锁,那么它能够在不释放写入锁的同时获取读取锁?即写入锁降级为读取锁 ?
    写入锁降级为读取锁是允许的。
  5. 读取锁能够优先于其它正在等待的写入锁和读取锁而升级为一个写入锁?
    读取锁升级为写入锁是不允许的,因为这可能造成死锁(如果两个读取锁都升级为写入锁,那么两者都不会释放读取锁)。

一个用读—写锁来包装map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class LockMap<K, V>{

private final Map<K, V> map;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock rLock = lock.readLock();
private final Lock wLock = lock.writeLock();

public Test(Map<K, V> map){
this.map = map;
}

public void put(K key, V value){
wLock.lock();
try {
map.put(key, value);
} finally{
wLock.unlock();//一定要记得释放
}
}

public V get(K key){
rLock.lock();
try {
return map.get(key);
} finally{
rLock.unlock();//一定要记得释放
}
}
}